{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Feature Store Query Planner Tutorial\n",
    "\n",
    "The query planner in the Hopsworks Feature Store allows to query the feature store with high level functions, such as `get_features`, and `get_featuregroup`. However, it is only intended as a utility tool, and the user can always fall back to using pure SQL to query the feature store.\n",
    "\n",
    "In this notebook we will go over a common \"gotcha\" when using the featurestore query planner: **naming conflicts**. A feature name is only guaranteed to be unique within its feature group and version. The feature name **is not globally unique**; this means that conflicts can occur. In this notebook we will illustrate how to handle querying the feature store when there are conflicts.\n",
    "\n",
    "TLDR;\n",
    "\n",
    "If there is a conflict, you can fall back to using pure SQL to query the feature store, or you can provide extra information to the query planner in order to resolve the conflict."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Inspect two Feature Groups with a Naming Conflict on Column: `team_id`\n",
    "\n",
    "In this example we want to make a feature query that cross two feature groups, that have a common column `team_id` that can be used for joining features together."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hops import featurestore"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM attendances_features_1 against offline feature store\n",
      "root\n",
      " |-- team_id: integer (nullable = true)\n",
      " |-- average_attendance: float (nullable = true)\n",
      " |-- sum_attendance: float (nullable = true)"
     ]
    }
   ],
   "source": [
    "featurestore.get_featuregroup(\"attendances_features\").printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM teams_features_1 against offline feature store\n",
      "root\n",
      " |-- team_budget: float (nullable = true)\n",
      " |-- team_id: integer (nullable = true)\n",
      " |-- team_position: integer (nullable = true)"
     ]
    }
   ],
   "source": [
    "featurestore.get_featuregroup(\"teams_features\").printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Fetch a list of features from two feature groups"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### If there is not any naming conflict, the Query Planner can figure out how to fetch the features and join them"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Logical query plan for getting 4 features from the featurestore created successfully\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT sum_attendance, team_budget, average_attendance, team_position FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id` against offline feature store\n",
      "root\n",
      " |-- sum_attendance: float (nullable = true)\n",
      " |-- team_budget: float (nullable = true)\n",
      " |-- average_attendance: float (nullable = true)\n",
      " |-- team_position: integer (nullable = true)"
     ]
    }
   ],
   "source": [
    "featurestore.get_features([\"average_attendance\", \"sum_attendance\", \"team_budget\", \"team_position\"]).printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### The Query Planner is only intended as a utility, you can always fall back to pure SQL to query the feature store, which gives you more control"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- sum_attendance: float (nullable = true)\n",
      " |-- team_budget: float (nullable = true)\n",
      " |-- average_attendance: float (nullable = true)\n",
      " |-- team_position: integer (nullable = true)"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT sum_attendance, team_budget, average_attendance, team_position \" \\\n",
    "          \"FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id`\").printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### If there is a naming conflict, the query planner will ask us to specify the feature groups to fetch the features from"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Found the feature with name 'team_id' in more than one of the featuregroups of the featurestore: 'demo_featurestore_admin000_featurestore', please specify the optional argument 'featuregroup=', the matched featuregroups were: season_scores_features_1,attendances_features_1,players_features_1,teams_features_1\n",
      "Traceback (most recent call last):\n",
      "  File \"/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/featurestore.py\", line 367, in get_features\n",
      "    online=online)\n",
      "  File \"/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/featurestore_impl/core.py\", line 611, in _do_get_features\n",
      "    logical_query_plan.create_logical_plan()\n",
      "  File \"/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/featurestore_impl/query_planner/logical_query_plan.py\", line 32, in create_logical_plan\n",
      "    self._features_query()\n",
      "  File \"/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/featurestore_impl/query_planner/logical_query_plan.py\", line 148, in _features_query\n",
      "    featuregroups_parsed.values())\n",
      "  File \"/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/featurestore_impl/query_planner/query_planner.py\", line 62, in _find_feature\n",
      "    featuregroups_matched_str))\n",
      "hops.featurestore_impl.exceptions.exceptions.FeatureNameCollisionError: Found the feature with name 'team_id' in more than one of the featuregroups of the featurestore: 'demo_featurestore_admin000_featurestore', please specify the optional argument 'featuregroup=', the matched featuregroups were: season_scores_features_1,attendances_features_1,players_features_1,teams_features_1\n",
      "\n"
     ]
    }
   ],
   "source": [
    "featurestore.get_features([\"average_attendance\", \"sum_attendance\", \"team_budget\", \"team_position\", \"team_id\"]).printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### If there is a naming conflict even in the list of feature groups provided, SparkSQL will complain that the conflicting feature must be specified with the full name `tablename_featurename`.\n",
    "\n",
    "By specifying the list of feature groups and their version, we are effectively pruning the query planner to only look inside these feature groups. This reduces the chance of conflicts, but it is not a guaranteed protection against naming conflicts: there can be conflicts inside the selected feature groups as well."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\"Reference 'team_id' is ambiguous, could be: demo_featurestore_admin000_featurestore.attendances_features_1.team_id, demo_featurestore_admin000_featurestore.teams_features_1.team_id.; line 1 pos 36\"\n",
      "Traceback (most recent call last):\n",
      "  File \"/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/featurestore.py\", line 367, in get_features\n",
      "    online=online)\n",
      "  File \"/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/featurestore_impl/core.py\", line 617, in _do_get_features\n",
      "    result = _run_and_log_sql(spark, logical_query_plan.sql_str, online=online, featurestore=featurestore)\n",
      "  File \"/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/featurestore_impl/core.py\", line 424, in _run_and_log_sql\n",
      "    return spark.sql(sql_str)\n",
      "  File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/session.py\", line 767, in sql\n",
      "    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)\n",
      "  File \"/srv/hops/spark/python/lib/py4j-src.zip/py4j/java_gateway.py\", line 1257, in __call__\n",
      "    answer, self.gateway_client, self.target_id, self.name)\n",
      "  File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 69, in deco\n",
      "    raise AnalysisException(s.split(': ', 1)[1], stackTrace)\n",
      "pyspark.sql.utils.AnalysisException: \"Reference 'team_id' is ambiguous, could be: demo_featurestore_admin000_featurestore.attendances_features_1.team_id, demo_featurestore_admin000_featurestore.teams_features_1.team_id.; line 1 pos 36\"\n",
      "\n"
     ]
    }
   ],
   "source": [
    "featurestore.get_features([\"average_attendance\", \"sum_attendance\", \"team_budget\", \"team_position\", \"team_id\"], \n",
    "                          featuregroups_version_dict = {\"attendances_features\": 1, \"teams_features\": 1}).printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Same Error Occurs if we Skip the Query Planner and Query Hive Directly to get the list of features, as it is not valid SparkSQL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\"Reference 'team_id' is ambiguous, could be: demo_featurestore_admin000_featurestore.attendances_features_1.team_id, demo_featurestore_admin000_featurestore.teams_features_1.team_id.; line 1 pos 36\"\n",
      "Traceback (most recent call last):\n",
      "  File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/session.py\", line 767, in sql\n",
      "    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)\n",
      "  File \"/srv/hops/spark/python/lib/py4j-src.zip/py4j/java_gateway.py\", line 1257, in __call__\n",
      "    answer, self.gateway_client, self.target_id, self.name)\n",
      "  File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 69, in deco\n",
      "    raise AnalysisException(s.split(': ', 1)[1], stackTrace)\n",
      "pyspark.sql.utils.AnalysisException: \"Reference 'team_id' is ambiguous, could be: demo_featurestore_admin000_featurestore.attendances_features_1.team_id, demo_featurestore_admin000_featurestore.teams_features_1.team_id.; line 1 pos 36\"\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT team_budget, sum_attendance, team_id, team_position, average_attendance \" \\\n",
    "          \"FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id`\").printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### To avoid the  error we can specify the conflicting feature name with a full name (`featuregroupname`_`version`.`featurename`)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Logical query plan for getting 5 features from the featurestore created successfully\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT team_budget, sum_attendance, attendances_features_1.team_id, team_position, average_attendance FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id` against offline feature store\n",
      "root\n",
      " |-- team_budget: float (nullable = true)\n",
      " |-- sum_attendance: float (nullable = true)\n",
      " |-- team_id: integer (nullable = true)\n",
      " |-- team_position: integer (nullable = true)\n",
      " |-- average_attendance: float (nullable = true)"
     ]
    }
   ],
   "source": [
    "featurestore.get_features([\"average_attendance\", \"sum_attendance\", \"team_budget\", \"team_position\", \"attendances_features_1.team_id\"], \n",
    "                          featuregroups_version_dict = {\"attendances_features\": 1, \"teams_features\": 1}).printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Logical query plan for getting 5 features from the featurestore created successfully\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT team_budget, sum_attendance, attendances_features_1.team_id, team_position, average_attendance FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id` against offline feature store\n",
      "root\n",
      " |-- team_budget: float (nullable = true)\n",
      " |-- sum_attendance: float (nullable = true)\n",
      " |-- team_id: integer (nullable = true)\n",
      " |-- team_position: integer (nullable = true)\n",
      " |-- average_attendance: float (nullable = true)"
     ]
    }
   ],
   "source": [
    "featurestore.get_features([\"average_attendance\", \"sum_attendance\", \"team_budget\", \"team_position\", \"attendances_features_1.team_id\", \n",
    "                           \"teams_features_1.team_id\"], \n",
    "                          featuregroups_version_dict = {\"attendances_features\": 1, \"teams_features\": 1}).printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Logical query plan for getting 5 features from the featurestore created successfully\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT team_budget, sum_attendance, attendances_features_1.team_id, team_position, average_attendance FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id` against offline feature store\n",
      "root\n",
      " |-- team_budget: float (nullable = true)\n",
      " |-- sum_attendance: float (nullable = true)\n",
      " |-- team_id: integer (nullable = true)\n",
      " |-- team_position: integer (nullable = true)\n",
      " |-- average_attendance: float (nullable = true)"
     ]
    }
   ],
   "source": [
    "featurestore.get_features([\"average_attendance\", \"sum_attendance\", \"team_budget\", \"team_position\", \"attendances_features_1.team_id\"], \n",
    "                          featuregroups_version_dict = {\"attendances_features\": 1, \"teams_features\": 1}).printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- team_budget: float (nullable = true)\n",
      " |-- sum_attendance: float (nullable = true)\n",
      " |-- team_id: integer (nullable = true)\n",
      " |-- team_position: integer (nullable = true)\n",
      " |-- average_attendance: float (nullable = true)"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT team_budget, sum_attendance, attendances_features_1.team_id, team_position, average_attendance \" \\\n",
    "          \"FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id`\").printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}